{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Tabular Data Pipeline with Concurrent Steps\n",
    "\n",
    "This example demonstrates an ML pipeline which preprocesses data in two concurrent steps, trains two networks, where each network's training depends upon the completion of its own preprocessed data, and picks the best model. It is implemented using the PipelineController class.\n",
    "\n",
    "The pipeline uses four tasks (each Task is created using a different notebook):\n",
    "* The pipeline controller Task (the current task)\n",
    "* A data preprocessing Task ([preprocessing_and_encoding.ipynb](https://github.com/allegroai/clearml/blob/master/examples/frameworks/pytorch/notebooks/table/preprocessing_and_encoding.ipynb))\n",
    "* A training Task [(train_tabular_predictor.ipynb](https://github.com/allegroai/clearml/blob/master/examples/frameworks/pytorch/notebooks/table/train_tabular_predictor.ipynb))\n",
    "* A comparison Task ([pick_best_model.ipynb](https://github.com/allegroai/clearml/blob/master/examples/frameworks/pytorch/notebooks/table/pick_best_model.ipynb))\n",
    "\n",
    "In this pipeline example, the data preprocessing Task and training Task are each added to the pipeline twice (each is in two steps). When the pipeline runs, the data preprocessing Task and training Task are cloned twice, and the newly cloned Tasks execute. The Task they are cloned from, called the base Task, does not execute. The pipeline controller passes different data to each cloned Task by overriding parameters. In this way, the same Task can run more than once in the pipeline, but with different data.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prerequisite\n",
    "Make sure to download the data needed for this task. See the [download_and_split.ipynb](https://github.com/allegroai/clearml/blob/master/examples/frameworks/pytorch/notebooks/table/download_and_split.ipynb) notebook"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# pip install with locked versions\n",
    "! pip install -U pip\n",
    "\n",
    "# If you don't have ClearML installed then uncomment this line\n",
    "# ! pip install -U clearml"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from clearml import Task, PipelineController"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "TABULAR_DATASET_ID = Task.get_task(\n",
    "    task_name=\"Download and split tabular dataset\", project_name=\"Tabular Example\"\n",
    ").id"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Pipeline Controller\n",
    "\n",
    "The PipelineController class includes functionality to create a pipeline controller, add steps to the pipeline, pass data from one step to another, control the dependencies of a step beginning only after other steps complete, run the pipeline, wait for it to complete, and cleanup afterwards.\n",
    "\n",
    "Input the following parameters:\n",
    "* `name` - Name of the PipelineController task which will created\n",
    "* `project` - Project which the controller will be associated with\n",
    "* `version` - Pipeline's version number. This version allows to uniquely identify the pipeline template execution. If not set, find the pipeline's latest version and increment it. If no such version is found, defaults to `1.0.0`.\n",
    " "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipe = PipelineController(\n",
    "    project=\"Tabular Example\",\n",
    "    name=\"tabular training pipeline\",\n",
    "    add_pipeline_tags=True,\n",
    "    version=\"0.1\",\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Add Preprocessing Step\n",
    "Two preprocessing nodes are added to the pipeline: `preprocessing_1` and `preprocessing_2`. These two nodes will be cloned from the same base task, created from the [preprocessing_and_encoding.ipynb](https://github.com/allegroai/clearml/blob/master/examples/frameworks/pytorch/notebooks/table/preprocessing_and_encoding.ipynb) script. These steps will run concurrently.\n",
    "\n",
    "The preprocessing data task fills in values of NaN data based on the values of the parameters named `fill_categorical_NA` and `fill_numerical_NA`. It will connect a parameter dictionary to the task which contains keys with those same names. The pipeline will override the values of those keys when the pipeline executes the cloned tasks of the base Task. In this way, two sets of data are created in the pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipe.add_step(\n",
    "    name=\"preprocessing_1\",\n",
    "    base_task_project=\"Tabular Example\",\n",
    "    base_task_name=\"tabular preprocessing\",\n",
    "    parameter_override={\n",
    "        \"General/data_task_id\": TABULAR_DATASET_ID,\n",
    "        \"General/fill_categorical_NA\": \"True\",\n",
    "        \"General/fill_numerical_NA\": \"True\",\n",
    "    },\n",
    ")\n",
    "\n",
    "pipe.add_step(\n",
    "    name=\"preprocessing_2\",\n",
    "    base_task_project=\"Tabular Example\",\n",
    "    base_task_name=\"tabular preprocessing\",\n",
    "    parameter_override={\n",
    "        \"General/data_task_id\": TABULAR_DATASET_ID,\n",
    "        \"General/fill_categorical_NA\": \"False\",\n",
    "        \"General/fill_numerical_NA\": \"True\",\n",
    "    },\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Add Training Step"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Two training nodes are added to the pipeline: `train_1` and `train_2`. These two nodes will be cloned from the same base task, created from the [train_tabular_predictor.ipynb](https://github.com/allegroai/clearml/blob/master/examples/frameworks/pytorch/notebooks/table/train_tabular_predictor.ipynb) script.\n",
    "\n",
    "Each training node depends upon the completion of one preprocessing node. The `parents` parameter is a list of step names indicating all steps that must complete before the new step starts. In this case, `preprocessing_1` must complete before `train_1` begins, and `preprocessing_2` must complete before `train_2` begins.\n",
    "\n",
    "The ID of a task whose artifact contains a set of preprocessed data for training will be overridden using the `data_task_id key`. Its value takes the form `${<stage-name>.<part-of-task>}`. In this case, `${preprocessing_1.id}` is the ID of one of the preprocessing node tasks. In this way, each training task consumes its own set of data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipe.add_step(\n",
    "    name=\"train_1\",\n",
    "    parents=[\"preprocessing_1\"],\n",
    "    base_task_project=\"Tabular Example\",\n",
    "    base_task_name=\"tabular prediction\",\n",
    "    parameter_override={\"General/data_task_id\": \"${preprocessing_1.id}\"},\n",
    ")\n",
    "pipe.add_step(\n",
    "    name=\"train_2\",\n",
    "    parents=[\"preprocessing_2\"],\n",
    "    base_task_project=\"Tabular Example\",\n",
    "    base_task_name=\"tabular prediction\",\n",
    "    parameter_override={\"General/data_task_id\": \"${preprocessing_2.id}\"},\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Add Model Comparison Step\n",
    "The model comparison step depends upon both training nodes completing and takes the two training node task IDs to override the parameters in the base task. The IDs of the training tasks from the steps named `train_1` and `train_2` are passed to the model comparison Task. They take the form `${<stage-name>.<part-of-Task>}`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipe.add_step(\n",
    "    name=\"pick_best\",\n",
    "    parents=[\"train_1\", \"train_2\"],\n",
    "    base_task_project=\"Tabular Example\",\n",
    "    base_task_name=\"pick best model\",\n",
    "    parameter_override={\"General/train_tasks_ids\": \"[${train_1.id}, ${train_2.id}]\"},\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set Default Execution Queue\n",
    "Set the default execution queue for pipeline steps that did not specify an execution queue. The pipeline steps will be enqueued for execution in this queue.\n",
    "\n",
    "\n",
    "> **_Note_** Make sure to assign a ClearML Agent to the queue which the steps are enqueued, so they will be executed\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipe.set_default_execution_queue(default_execution_queue=\"default\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Execute the Pipeline\n",
    "Start the pipeline! The `start` method launches the pipeline controller remotely, by default on the `services` queue (change the queue by passing `queue=<queue_name>`).\n",
    "\n",
    "In order to launch the pipeline control logic locally, use the `start_locally` method instead. \n",
    "\n",
    "Once the pipeline starts, wait for it to complete. Finally, cleanup the pipeline processes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Starting the pipeline (in the background)\n",
    "pipe.start()\n",
    "# Wait until pipeline terminates\n",
    "pipe.wait()\n",
    "# cleanup everything\n",
    "pipe.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
